(ns com.biffweb.impl.xtdb2
  (:require
   [clojure.string :as str]
   [com.biffweb.impl.util :as util]
   [com.biffweb.aliases.xtdb2 :as xta]
   [malli.core :as malli]
   [malli.error :as malli.e]
   [malli.util :as malli.u])
  (:import
   [java.util UUID]
   [java.util.concurrent LinkedBlockingQueue TimeUnit]))

(def have-dep (some? (util/try-resolve 'xtdb.api/execute-tx)))

(defn- get-conn [node]
  (.build (.createConnectionBuilder node)))

(defmacro ensure-dep [& body]
  (if-not have-dep
    `(throw (UnsupportedOperationException.
             "To call this function, you must add com.xtdb/xtdb-core v2 to your dependencies."))
    `(do ~@body)))

(defn where-clause [ks]
  (ensure-dep
   (->> ks
        (mapv #(str (xta/->normal-form-str %) " = ?"))
        (str/join " and "))))

(defn assert-unique [table kvs]
  (into [(str "assert 1 >= (select count(*) from " table " where "
              (where-clause (keys kvs)))]
        (vals kvs)))

(defn select-from-where [columns table kvs]
  (into [(str "select " (str/join ", " (mapv xta/->normal-form-str columns))
              " from " table
              " where " (where-clause (keys kvs)))]
        (vals kvs)))

(defn use-xtdb2-config [{:keys [biff/secret]
                         :biff.xtdb2/keys [storage log]
                         :biff.xtdb2.storage/keys [bucket endpoint access-key secret-key]
                         :or {storage :local log :local}}]
  (let [secret-key (secret :biff.xtdb2.storage/secret-key)]
    {:log [log
           (case log
             :local {:path "storage/xtdb2/log"}
             :kafka {:bootstrap-servers "localhost:9092"
                     :topic "xtdb-log"
                     ;; The default prod config for Biff apps uses remote storage and
                     ;; local log, so if kafka is being used, it'll probably be in the
                     ;; context of migrating from a local log. So might as well bump this
                     ;; pre-emptively.
                     :epoch 1})]
     :storage [storage
               (case storage
                 :local {:path "storage/xtdb2/storage"}
                 :remote {:object-store [:s3
                                         {:bucket bucket
                                          :endpoint endpoint
                                          :credentials {:access-key access-key
                                                        :secret-key secret-key}}]
                          :local-disk-cache "storage/xtdb2/storage-cache"})]}))

(defn all-system-times
  ([node]
   (all-system-times node #xt/instant "1970-01-01T00:00:00Z"))
  ([node after-inst]
   (lazy-seq
    (let [results (into []
                        (map :system-time)
                        (xta/plan-q node
                                    ["select system_time from xt.txs
                                      where committed = true
                                      and system_time > ?
                                      order by system_time asc limit 1000"
                                     after-inst]))]
      (concat results
              (some->> (peek results)
                       (all-system-times node)))))))

(defn all-tables [node]
  (->> (xta/q node (str "SELECT table_schema, table_name "
                        "FROM information_schema.tables "
                        "WHERE table_type = 'BASE TABLE' AND "
                        "table_schema NOT IN ('pg_catalog', 'information_schema');"))
       (filterv #(= "public" (:table-schema %)))
       (mapv :table-name)))

(defn tx-log [node & {:keys [tables after-inst]}]
  (let [after-inst (or after-inst #xt/instant "1970-01-01T00:00:00Z")
        tables     (or tables (all-tables node))]
    (->> (all-system-times node after-inst)
         (partition-all 1000)
         (mapcat (fn [system-times]
                   (let [start (first system-times)
                         end (last system-times)]
                     (->> tables
                          (pmap (fn [table]
                                  (mapv #(assoc % :biff.xtdb/table table)
                                        (xta/q node [(str "select *, _system_from, _system_to, _valid_from, _valid_to "
                                                          "from " table  " for all system_time "
                                                          "where _system_from >= ? "
                                                          "and _system_from <= ? "
                                                          "order by _system_from")
                                                     start
                                                     end]))))
                          (apply concat)
                          (sort-by :xt/system-from))))))))

(defn latest-system-time [node]
  (get-in (xta/q node "select max(system_time) from xt.txs where committed = true")
          [0 :xt/column-1]))

(defn use-xtdb2-listener [{:keys [biff/node biff/modules biff.xtdb.listener/tables] :as ctx}]
  (let [continue (atom true)
        done (promise)
        ;; Wait for system time to settle
        system-time (atom (loop [old-t nil
                                 new-t (latest-system-time node)]
                            (if (= old-t new-t)
                              new-t
                              (do
                                (Thread/sleep 1000)
                                (recur new-t (latest-system-time node))))))
        stop-fn (fn []
                  (reset! continue false)
                  (deref done 10000 nil))
        queue (LinkedBlockingQueue. 1)
        poll-now #(.offer queue true)]
    (future
      (do
        (util/catchall-verbose
         (while @continue
           (.poll queue 1 TimeUnit/SECONDS)
           (let [listeners (not-empty (keep :on-tx @modules))
                 prev-t @system-time
                 latest-t (when listeners
                            (latest-system-time node))]
             (when (and listeners (not= prev-t latest-t))
               (reset! system-time latest-t)
               (doseq [record (tx-log node {:after-inst prev-t :tables tables})
                       listener listeners]
                 (util/catchall-verbose (listener ctx record)))))))
        (deliver done nil)))
    (-> ctx
        (assoc :biff.xtdb.listener/poll-now poll-now)
        (update :biff/stop conj stop-fn))))

(defn use-xtdb2 [ctx]
  (ensure-dep
   (let [node (xta/start-node (use-xtdb2-config ctx))]
     (-> ctx
         (assoc :biff/node node)
         (update :biff/stop conj #(.close node))))))

(defn prefix-uuid [uuid-prefix uuid-rest]
  (UUID/fromString (str (subs (str uuid-prefix) 0 4)
                        (subs (str uuid-rest) 4))))

(defn validate-tx [tx malli-opts]
  (doseq [tx-op tx
          :when (#{:put-docs :patch-docs} (first tx-op))
          :let [[op opts & records] tx-op
                table (if (keyword? opts)
                        opts
                        (:into opts))
                schema (cond-> (malli/schema table malli-opts)
                         (= op :patch-docs) malli.u/optional-keys)]]
    (doseq [record records]
      (when-not (some? (:xt/id record))
        (throw (ex-info "Record is missing an :xt/id value."
                        {:table table
                         :record record})))
      (when-not (malli/validate schema record malli-opts)
        (throw (ex-info "Record doesn't match schema."
                        {:table table
                         :record record
                         :explain (malli.e/humanize (malli/explain schema record))})))))
  true)

(defn submit-tx [{:keys [biff/node biff.xtdb.listener/poll-now biff/malli-opts]} tx & [opts]]
  (validate-tx tx @malli-opts)
  (let [result (xta/submit-tx node tx opts)]
    (when poll-now (poll-now))
    result))
